package com.bbx.flink.demo.allow_latenss;


import com.bbx.flink.demo.entity.Temperature;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

/**
 * 本Demo功能点
 * 1、设置Event Time 的watermark ，
 * 2、侧输出流，等待迟到的数据
 *
 * 注意点：
 * 1、进哪个窗口是由 Event Time 决定的
 * 2、watermark 只是用来表示 标记时间内的元素已经到达，
 * 3、窗口开始时间计算公式 ：timestamp - (timestamp - offset + windowSize) % windowSize
 *      windowSize 为窗口大小
 *      timestamp 为 Event Time
 *      offset ： 时间偏移（主要用于时区调整）
 *      例如：以下面程序为例，windowSize 为：15s ， offset ：0，watermark 延迟时间为 2s ，输入的元素如下：
 *          1,1607654003161,11    0
 *          1,1607654004161,22    1
 *          1,1607654008161,19    5
 *          1,1607654009161,16    6
 *          1,1607654010161,17    7
 *          1,1607654011161,17    8
 *          1,1607654012161,27    9
 *
 *          1,1607654013161,17    10
 *      输入第一个元素的 event time为 1607654003161 ms ，offset 为0
 *      则窗口开始时间为 ：1607654003161-（1607654003161-0+15000）%15000=1607653995000
 *      因此范围为 第一个窗口为 [1607653995000   1607654010000)
 *               第二个窗口为 [1607654010000   1607654025000)
 *                   .......
 *       ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 *                    窗口内数据        watermark 时间
*       第一个窗口 1,1607654003161,11   1607654001161
 *               1,1607654004161,22   1607654002161
 *               1,1607654008161,19   1607654006161
 *               1,1607654009161,16   1607654007161
*       第二个窗口 1,1607654010161,17   1607654008161
 *               1,1607654011161,17   1607654009161
 *               1,1607654012161,27   1607654010161
 *               1,1607654013161,17   1607654011161
*      因为watermark延迟时间为2 s ，因此第一个窗口关闭时间为 1607654012000，当元素 1,1607654012161,27  抵达时第一个窗口开始关闭
 */
@Slf4j
public class SideOutDemo {

    public static void  main (String [] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置流的时间特征，使用Event Time 必须要设置
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        //设置watermark 的时间间隔，有默认值
//        env.getConfig().setAutoWatermarkInterval(1L);

        OutputTag<Temperature> sideWindow = new OutputTag<>("sideWindow", PojoTypeInfo.of(Temperature.class));

        SingleOutputStreamOperator<Temperature> reduce = env.socketTextStream("192.168.10.131", 10003)
                .map((MapFunction<String, Temperature>) elment -> {
                    String[] varElment = elment.split(",");
                    return new Temperature(varElment[0], Long.parseLong(varElment[2]), Long.parseLong(varElment[1]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner((i, timestamp) -> i.getTime())
//                        .withIdleness(Duration.ofSeconds(3))

                )
                .keyBy(i -> i.getId())
                .timeWindow(Time.seconds(15))
                //设定等待迟到数据的时间
                .allowedLateness(Time.seconds(5L))
                //为迟到数据开启侧输出流
                .sideOutputLateData(sideWindow)
                .max("tem");

        reduce.print("~~~~~~~~~");
        reduce.getSideOutput(sideWindow).print("######");
        env.execute();
    }





}
